Executors创建的4种线程池的使用 - StringBuilder_Sun - 博客园

创建时间:2018/12/21 9:12
来源:https://www.cnblogs.com/ljp-sun/p/6580147.html

导航

<2018年12月>
2526272829301
2345678
9101112131415
16171819202122
23242526272829
303112345

公告

昵称:StringBuilder_Sun
园龄:3年8个月
粉丝:2
关注:7

统计

  • 随笔 - 21
  • 文章 - 0
  • 评论 - 3
  • 引用 - 0

搜索

 
 

最新评论

推荐排行榜

Executors创建的4种线程池的使用

Java通过Executors提供四种线程池,分别为:
newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

注意:线程池只是为了控制应用中处理某项业务中防止高并发问题带来的线程不安全的发生的概率。在我目前的测试用,还没有发现线程可以重用这个概念,因为线程开启后,用完就关闭了,不可以再次开启的,查看源码发现会每次新创建一个线程用来处理业务。我们可以通过线程池指定处理这项业务最大的同步线程数,比如:Executors.newFixedThreadPool(3);在线程池中保持三个线程可以同时执行,但是注意,并不是说线程池中永远都是这三个线程,只是说可以同时存在的线程数,当某个线程执行结束后,会有新的线程进来。newFixedThreadPool.execute(new ThreadForpools());这句话的含义并不是添加新的线程,而是添加新的处理业务请求进来。至少我当前是这么理解的,没有发现线程可以重复使用。

处理线程代码:

package com.alivn.sockets;
/**
 * Created by Alivn on 2017/3/19.
 */
public class ThreadForpools implements Runnable{

    private Integer index;
    public  ThreadForpools(Integer index)
    {
     this.index=index;
    }
    @Override
    public void run() {
        /***
         * 业务......省略
          */
        try {
            System.out.println("开始处理线程!!!");
            Thread.sleep(index*100);
            System.out.println("我的线程标识是:"+this.toString());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

(1) newCachedThreadPool
创建一个可缓存线程池,应用中存在的线程数可以无限大

示例代码如下:

package com.alivn.sockets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * Created by Alivn on 2017/3/19.
 */
public class Threadpools {

    /**
     * 我们获取四次次线程,观察4个线程地址
     * @param args
     */
    public static  void main(String[]args)
    {
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        System.out.println("****************************newCachedThreadPool*******************************");
        for(int i=0;i<4;i++)
        {
            final int index=i;
          newCachedThreadPool.execute(new ThreadForpools(index));
        }
    }
}

输出结果是:可以有无限大的线程数进来(线程地址不一样)

(2) newFixedThreadPool
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。示例代码如下:

package com.alivn.sockets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * Created by Alivn on 2017/3/19.
 */
public class Threadpools {

    /**
     * 我们获取四次次线程,观察4个线程地址
     * @param args
     */
    public static  void main(String[]args)
    {
        //线程池允许同时存在两个线程
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        System.out.println("****************************newFixedThreadPool*******************************");
        for(int i=0;i<4;i++)
        {
            final int index=i;
            newFixedThreadPool.execute(new ThreadForpools(index));
        }
    }
}

输出结果:每次只有两个线程在处理,当第一个线程执行完毕后,新的线程进来开始处理(线程地址不一样)

 

(3)  newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。延迟执行示例代码如下:

package com.alivn.sockets;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * Created by Alivn on 2017/3/19.
 */
public class Threadpools {

    /**
     * 我们获取四次次线程,观察4个线程地址
     * @param args
     */
    public static  void main(String[]args)
    {
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(2);
        System.out.println("****************************newFixedThreadPool*******************************");
        for(int i=0;i<4;i++)
        {
            final int index=i;
            //延迟三秒执行
            newScheduledThreadPool.schedule(new ThreadForpools(index),3, TimeUnit.SECONDS);
        }
    }
}

执行结果:延迟三秒之后执行,除了延迟执行之外和newFixedThreadPool基本相同,可以用来执行定时任务

4) newSingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。示例代码如下:

package com.alivn.sockets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * Created by Alivn on 2017/3/19.
 */
public class Threadpools {

    /**
     * 我们获取四次次线程,观察4个线程地址
     * @param args
     */
    public static  void main(String[]args)
    {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        System.out.println("****************************newFixedThreadPool*******************************");
        for(int i=0;i<4;i++)
        {
            final int index=i;
            newSingleThreadExecutor.execute(new ThreadForpools(index));
        }
    }
}

执行结果:只存在一个线程,顺序执行

posted on 2017-03-19 12:48 StringBuilder_Sun 阅读(28844) 评论(3) 编辑 收藏

评论

#1楼 2018-09-27 10:32 王磊的博客  

jdk8增加了newWorkStealingPool(int parall),增加并行处理任务的线程池,不能保证处理的顺序。
  

#2楼 2018-12-19 17:13 _Breeze  

Worker线程还是会被重用的,当workQueue中有任务等待的时候,一旦有空闲的Worker,这些空闲Worker就会不断地从等待队列取出任务进行执行,任务会在Worker的线程中执行,具体的执行逻辑是直接调用Runnable类型任务的run()方法,因此Worker线程其实是被重用了;但当核心线程池没有满的时候,此时每次添加任务创建的都是新的Worker,当等待队列满了,但Worker数量没有达到maximumPoolSize时的情况也是一样。

你这个打印线程标识的方式是不对的,应该打印线程的内存地址,用System.out.println(System.identityHashCode(Thread.currentThread()));。

具体可以看runWorker(Worker)方法的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
final void runWorker(Worker w) {
    // 引用worker的firstTask任务,并清除worker的firstTask
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 用于标识worker是不是因异常而死亡
    boolean completedAbruptly = true;
    try {
        // worker取任务执行
        while (task != null || (task = getTask()) != null) {
            // 加锁
            w.lock();
            clearInterruptsForTaskRun();
            try {
                // 执行beforeExecute()钩子方法
                beforeExecute(w.thread, task);
                // 用于记录运行过程中的异常
                Throwable thrown = null;
                try {
                    // 执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x;
                    throw x;
                } catch (Error x) {
                    thrown = x;
                    throw x;
                } catch (Throwable x) {
                    thrown = x;
                    throw new Error(x);
                } finally {
                    // 执行afterExecute()钩子方法
                    afterExecute(task, thrown);
                }
            } finally {
                // 将执行完的任务清空
                task = null;
                // 将worker的完成任务数加1
                w.completedTasks++;
                // 解锁
                w.unlock();
            }
        }
        // 运行到这里表示运行过程中没有出现异常
        completedAbruptly = false;
    } finally {
        // 调用processWorkerExit()方法处理Worker的后续清理和退出流程
        processWorkerExit(w, completedAbruptly);
    }
}
  

#3楼 2018-12-19 17:55 _Breeze  

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.util.Random;
import java.util.concurrent.ExecutorService;
 
class TestTask implements Runnable {
     
    @Override
    public void run() {
        try {
            Thread.sleep(new Random().nextInt(3000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(System.identityHashCode(Thread.currentThread()));
    }
}
 
public class ExecutorsTest {
     
    public static void main(String[] args) {
         
        ExecutorService executorService = Executors.newFixedThreadPool(2);
         
        for (int i = 0; i < 10; i++)
            executorService.execute(new TestTask());
         
        executorService.shutdown();
         
    }
}
  

Powered by:
博客园
Copyright © StringBuilder_Sun